package net.bwie.jtp.dwd.log.job;

import net.bwie.jtp.dwd.log.function.JtpLogClickHouse;
import net.bwie.realtime.jtp.common.utils.KafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class JtpLogEtlDwdJob {
    public static void main(String[] args) throws Exception {


        //1-创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //2-数据源source
        DataStream<String> pageStream = KafkaUtil.consumerKafka(env, "dwd-traffic-page-log");
        DataStream<String> startStream = KafkaUtil.consumerKafka(env, "dwd-traffic-start-log");
        DataStream<String> actionStream = KafkaUtil.consumerKafka(env, "dwd-traffic-action-log");


        //3-数据sink
//        JtpLogClickHouse.savePageLog(pageStream);
        JtpLogClickHouse.saveActionLog(actionStream);
//        JtpLogClickHouse.saveStartLog(startStream);

        //4-执行
        env.execute("JtpLogEtlDwdJob");



    }
}
